#!/usr/bin/python
#
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.


"""Command-line interface for miscellaneous factory actions.

Run "factory --help" for more info and a list of subcommands.

To add a subcommand, just add a new Subcommand subclass to this file.
"""


import argparse
import csv
import inspect
import logging
import re
import socket
import sys
import time
import yaml
from setproctitle import setproctitle

import factory_common  # pylint: disable=W0611
from cros.factory.test import factory
from cros.factory.test import phase
from cros.factory.test import shopfloor
from cros.factory.test.factory import TestState
from cros.factory.test.test_lists import test_lists
from cros.factory.test import utils
from cros.factory.utils.process_utils import Spawn


class Subcommand(object):
  """A 'factory' subcommand.

  Properties:
    name: The name of the command (set by the subclass).
    help: Help text for the command (set by the subclass).
    parser: The ArgumentParser object.
    subparser: The subparser object created with parser.add_subparsers.
    subparsers: A collection of all subparsers.
    args: The parsed arguments.
  """
  name = None  # Overridden by subclass
  help = None  # Overridden by subclass

  parser = None
  args = None
  subparser = None
  subparsers = None

  def Init(self):
    """Initializes the subparser.

    May be implemented the subclass, which may use "self.subparser" to
    refer to the subparser object.
    """
    pass

  def Run(self):
    """Runs the command.

    Must be implemented by the subclass.
    """
    raise NotImplementedError()


class HelpCommand(Subcommand):
  name = 'help'
  help = 'Get help on COMMAND'

  def Init(self):
    self.subparser.add_argument('command', metavar='COMMAND', nargs='?')

  def Run(self):
    if self.args.command:
      choice = self.subparsers.choices.get(self.args.command)
      if not choice:
        sys.exit('Unknown subcommand %r' % self.args.command)
      choice.print_help()
    else:
      self.parser.print_help()


class RunCommand(Subcommand):
  name = 'run'
  help = 'Run a test'

  def Init(self):
    self.subparser.add_argument(
        'id', metavar='ID',
        help='ID of the test to run')

  def Run(self):
    run_id = factory.get_state_instance().RunTest(self.args.id)
    print 'Running test %s' % self.args.id
    print 'Active test run ID: %s' % run_id


class WaitCommand(Subcommand):
  name = 'wait'
  help = ('Wait for all tests to finish running, displaying status as testing '
          'progresses')

  def Init(self):
    self.subparser.add_argument(
        '--poll-interval', type=int, default=1,
        help='Poll interval in seconds')

  def Run(self):
    # Dict mapping test path -> test information.
    last_test_dict = None

    while True:
      tests = factory.get_state_instance().GetTests()
      test_dict = {}
      for t in tests:
        # Don't bother showing parent nodes.
        if t['parent']:
          continue
        if last_test_dict is None:
          # First time; just print active tests
          if t['status'] == TestState.ACTIVE:
            print '%s: %s' % (t['path'], t['status'])
        else:
          # Show any tests with changed statuses.
          if t['status'] != last_test_dict[t['path']]['status']:
            sys.stdout.write('%s: %s' % (t['path'], t['status']))
            if t['status'] == TestState.FAILED:
              sys.stdout.write(' (%r)' % str(t['error_msg']))
            sys.stdout.write('\n')

        test_dict[t['path']] = t

      # Save the test information for next time.
      last_test_dict = test_dict
      sys.stdout.flush()
      if not any(t['pending'] for t in tests):
        # All done!  Bail.
        print 'done'
        break
      # Wait one second and poll again
      time.sleep(1)


class RunStatusCommand(Subcommand):
  name = 'run-status'
  help = 'Show information about a test run'

  def Init(self):
    self.subparser.add_argument(
        '--id', default=None, help='ID of the test run')

  def Run(self):
    goofy = factory.get_state_instance()
    run_status = goofy.GetTestRunStatus(self.args.id)
    print 'status: %s' % run_status['status']
    if 'run_id' in run_status:
      print 'run_id: %s' % run_status['run_id']
      print 'scheduled_tests:'
      # Simply call 'tests' subcommand to print out information about the
      # scheduled tests.
      args = self.parser.parse_args(['tests', '--this-run', '--status'])
      args.subcommand.args = args
      args.subcommand.Run()


class TestsCommand(Subcommand):
  name = 'tests'
  help = 'Show information about tests'

  def Init(self):
    self.subparser.add_argument(
        '--interesting', '-i', action='store_true',
        help=('Show only information about "interesting" tests '
              '(tests that are not untested or passed'))
    self.subparser.add_argument(
        '--status', '-s', action='store_true',
        help='Include information about test status')
    self.subparser.add_argument(
        '--yaml', action='store_true',
        help='Show lots of information in YAML format')
    self.subparser.add_argument(
        '--this-run', action='store_true',
        help='Show only information about current active run')

  def Run(self):
    goofy = factory.get_state_instance()
    tests = goofy.GetTests()

    # Ignore parents
    tests = [x for x in tests if not x.get('parent')]

    if self.args.interesting:
      tests = [
          x for x in tests if x['status'] in [
              TestState.ACTIVE, TestState.FAILED]]

    if self.args.this_run:
      scheduled_tests = (
          goofy.GetTestRunStatus(None).get('scheduled_tests') or [])
      scheduled_tests = set([t['path'] for t in scheduled_tests])
      tests = [
          x for x in tests if x['path'] in scheduled_tests]

    if self.args.yaml:
      print yaml.safe_dump(tests)
    elif self.args.status:
      for t in tests:
        sys.stdout.write(t['path'])
        if t['status'] != TestState.UNTESTED:
          sys.stdout.write(': %s' % t['status'])
        if t['error_msg']:
          sys.stdout.write(': %r' % str(t['error_msg']))
        sys.stdout.write('\n')
    else:
      for t in tests:
        print t['path']


class ClearCommand(Subcommand):
  name = 'clear'
  help = 'Stop all tests and clear test state'

  def Run(self):
    factory.get_state_instance().ClearState()


class StopCommand(Subcommand):
  name = 'stop'
  help = 'Stop all tests'

  def Run(self):
    factory.get_state_instance().StopTest()


class DumpTestListCommand(Subcommand):
  name = 'dump-test-list'
  help = 'Dump a test list in YAML format'

  def Init(self):
    self.subparser.add_argument(
        '--format', metavar='FORMAT',
        help='Format in which to dump test list',
        default='yaml',
        choices=('yaml', 'csv'))
    self.subparser.add_argument(
        'id', metavar='ID', help='ID of test list to dump')

  def Run(self):
    test_list = test_lists.BuildTestList(self.args.id)
    if isinstance(test_list, test_lists.OldStyleTestList):
      test_list = test_list.Load()

    if self.args.format == 'csv':
      writer = csv.writer(sys.stdout)
      writer.writerow(('id', 'module'))
      for t in test_list.walk():
        if t.is_leaf():
          if t.pytest_name:
            module = t.pytest_name
          elif t.autotest_name:
            module = t.autotest_name
          elif t.invocation_target:
            module = repr(t.invocation_target)
          else:
            module = ''

          writer.writerow((t.path, module))
    else:
      test_lists.YamlDumpTestListDestructive(test_list, sys.stdout)


class TestListCommand(Subcommand):
  name = 'test-list'
  help = ('Set or get the active test list, and/or list all test lists. '
          'Note that generic test list is allowed only when there is no '
          'main test list or when factory test automation is enabled.')

  TIMEOUT_SECS = 60
  POLL_INTERVAL_SECS = 0.5

  def Init(self):
    self.subparser.add_argument(
        'id', metavar='ID', nargs='?',
        help=('ID of test list to activate (run '
              '"factory test-list --list" to see all available IDs)'))
    self.subparser.add_argument(
        '--list', action='store_true',
        help='List all available test lists')
    self.subparser.add_argument(
        '--restart', action='store_true',
        help='Restart goofy and wait for new test list to come up')
    self.subparser.add_argument(
        '--clear-all', '-a', action='store_true',
        help='If restarting goofy, clear all state (like factory_restart -a)')

  def Run(self):
    if self.args.id:
      all_test_lists = test_lists.BuildAllTestLists(force_generic=True)

      if self.args.id not in all_test_lists:
        sys.exit('Unknown test list ID %r (use "factory test-list --list" to '
                 'see available test lists' % self.args.id)
      test_lists.SetActiveTestList(self.args.id)
      print 'Set active test list to %s (wrote %r to %s)' % (
          self.args.id, self.args.id, test_lists.ACTIVE_PATH)
      sys.stdout.flush()
    else:
      print test_lists.GetActiveTestListId()

    if self.args.list:
      all_test_lists = test_lists.BuildAllTestLists(force_generic=True)
      active_id = test_lists.GetActiveTestListId()

      line_format = '%-8s %-20s %s'
      print line_format % ('ACTIVE?', 'ID', 'PATH')

      for k, v in sorted(all_test_lists.items()):
        is_active = '(active)' if k == active_id else ''
        path = (v.path if isinstance(v, test_lists.OldStyleTestList)
                else v.source_path)
        print line_format % (is_active, k, path)

    if self.args.restart:
      goofy = factory.get_state_instance()

      # Get goofy's current UUID
      try:
        uuid = goofy.GetGoofyStatus()['uuid']
      except socket.error:
        logging.info('goofy is not up')
      except:  # pylint: disable=W0702
        logging.exception('Unable to get goofy status; assuming it is down')
        uuid = None

      # Set the proc title so factory_restart won't kill us.
      setproctitle('factory set-active-test-list')

      # Restart goofy, clearing its state
      Spawn(['factory_restart'] +
            (['-a'] if self.args.clear_all else []),
            check_call=True, log=True)

      # Wait for goofy to come up with a different UUID
      start = time.time()

      last_status_summary = None

      while True:
        try:
          status = goofy.GetGoofyStatus()
          status_summary = str(status)
          if status['uuid'] == uuid:
            # goofy hasn't shut down yet
            continue
          if status['status'] == 'RUNNING':
            # All good
            logging.info(status_summary)
            logging.info('goofy is up')
            if status['test_list_id'] != self.args.id:
              # Shouldn't ever happen
              sys.exit('goofy came up with wrong test list %r' %
                       status['test_list_id'])
            return
          if status['status'] not in ['UNINITIALIZED', 'INITIALIZING']:
            # This means it's never going to come up.
            sys.exit('goofy failed to come up; status is %r',
                     status['status'])
        except:  # pylint: disable=W0702
          status_summary = 'Exception: %s' % utils.FormatExceptionOnly()
          if 'Connection refused' in status_summary:
            # Still waiting for goofy to open its RPC; print a friendly
            # error message
            status_summary = (
                'Waiting patiently for goofy to accept RPC connections...')

        if status_summary != last_status_summary:
          logging.info(status_summary)
          last_status_summary = status_summary
        if time.time() - start >= self.TIMEOUT_SECS:
          sys.exit('goofy did not come up after %s seconds' % self.TIMEOUT_SECS)
        time.sleep(self.POLL_INTERVAL_SECS)


class DeviceDataCommand(Subcommand):
  name = 'device-data'
  help = 'Show the contents of the device data dictionary'

  def Init(self):
    self.subparser.add_argument(
        'set', metavar='KEY=VALUE', nargs='*',
        help=('(To be used only manually for debugging) '
              'Sets a device data KEY to VALUE. If VALUE is one of '
              '["True", "true", "False", "false"], then it is considered '
              'a bool. If it is "None" then it is considered to be None. '
              'If it can be coerced to an int, it is considered an int. '
              'Otherwise, it is considered a string. '
              'To avoid type ambiguity, if you need to programmatically '
              'modify device data, don\'t use this; use --set-yaml.'))
    self.subparser.add_argument(
        '--set-yaml', metavar='FILE',
        help=('Read FILE (or stdin if FILE is "-") as a YAML dictionary '
              'and set device data.'))
    self.subparser.add_argument(
        '--delete', '-d', metavar='KEY', nargs='*',
        help='Deletes KEYs from device data. '
             '"factory device-data -d A B C" deletes A, B, C from device-data.')

  def Run(self):
    if self.args.set:
      update = {}
      for item in self.args.set:
        match = re.match(r'^([^=]+)=(.*)$', item)
        if not match:
          sys.exit('--set argument %r should be in the form KEY=VALUE')

        key, value = match.groups()
        if value in ['True', 'true']:
          value = True
        elif value in ['False', 'false']:
          value = False
        elif value == 'None':
          value = None
        else:
          try:
            value = int(value)
          except ValueError:
            pass  # No sweat

        update[key] = value
      shopfloor.UpdateDeviceData(update, post_update_event=False)
      factory.get_state_instance().UpdateSkippedTests()

    if self.args.delete:
      shopfloor.DeleteDeviceData(self.args.delete, post_update_event=False)
      factory.get_state_instance().UpdateSkippedTests()

    if self.args.set_yaml:
      if self.args.set_yaml == '-':
        update = yaml.load(sys.stdin)
      else:
        with open(self.args.set_yaml) as f:
          update = yaml.load(f)
      if type(update) != dict:
        sys.exit('Expected a dict but got a %r' % type(update))
      shopfloor.UpdateDeviceData(update, post_update_event=False)
      factory.get_state_instance().UpdateSkippedTests()

    sys.stdout.write(
        yaml.safe_dump(shopfloor.GetDeviceData(),
                       default_flow_style=False))


class ScreenshotCommand(Subcommand):
  name = 'screenshot'
  help = 'Take a screenshot of the Goofy tab that runs the factory test UI'

  def Init(self):
    self.subparser.add_argument(
        'output_file', metavar='OUTPUT_FILE', nargs='?',
        help=('The output filepath to save the captured screen as a PNG file.  '
              'If not provided, defaults to /var/log/screenshot_<TIME>.png.'))

  def Run(self):
    factory.get_state_instance().DeviceTakeScreenshot(self.args.output_file)


class PhaseCommand(Subcommand):
  name = 'phase'
  help = 'Query or set the current phase'

  def Init(self):
    self.subparser.add_argument(
        '--set', metavar='PHASE',
        help='Sets the current phase (one of %(choices)s)',
        choices=phase.PHASE_NAMES + ['None'])

  def Run(self):
    if self.args.set:
      phase.SetPersistentPhase(None if self.args.set in ['None', '']
                               else self.args.set)
    print phase.GetPhase()


def main():
  factory.init_logging()
  parser = argparse.ArgumentParser(
      description=(
          'Miscellaneous factory commands for use on DUTs (devices under '
          'test). Use "factory help COMMAND" for more info on a '
          'subcommand.'))
  subparsers = parser.add_subparsers(title='subcommands')

  for _, v in sorted(globals().items()):
    if v != Subcommand and inspect.isclass(v) and issubclass(v, Subcommand):
      subcommand = v()
      assert subcommand.name
      assert subcommand.help
      v.parser = parser
      v.subparsers = subparsers
      v.subparser = subparsers.add_parser(subcommand.name, help=subcommand.help)
      v.subparser.set_defaults(subcommand=subcommand)
      subcommand.Init()

  args = parser.parse_args()
  args.subcommand.args = args
  args.subcommand.Run()


if __name__ == '__main__':
  main()
